package org.zjvis.datascience.service.dag;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Callable;

/**
 * @description 常规任务ETL等执行调度器
 * @date 2021-12-10
 */
public class TaskRunner implements Callable<TaskRunnerResult> {
    private String errorTpl = "{\"status\":500, \"error_msg\":\"%s\"}";
    private int maxRetryTimes = 3;
    private int sleepTime = 20000;
    private GPDataProvider provider;
    private TaskInstanceDTO instance;

    public TaskRunner(GPDataProvider provider, TaskInstanceDTO instance) {
        this.provider = provider;
        this.instance = instance;
    }

    @Override
    public TaskRunnerResult call() throws InterruptedException {
        int index = 0;
        TaskRunnerResult result = null;
        if (instance.hasPrecautionaryError()) {
            return new TaskRunnerResult(500, String.format(errorTpl, "error happens when init stage."));
        }
        while (index < maxRetryTimes) {
            try {
                result = this.exec();
                if (result.getStatus() == 0) {
                    break;
                }
                ++index;
            } catch (Exception e) {
                ++index;
                Thread.sleep(sleepTime);
                result = new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
            }
        }
        return result;
    }

    public TaskRunnerResult exec() {
        String sql = instance.getSqlText();
        // TODO 判断状态
        if (StringUtils.isEmpty(sql)) {
            return new TaskRunnerResult(0, null);
        }

        Connection conn = null;
        Statement st;
        try {
            conn = provider.getConn(1L);
            st = conn.createStatement();
        } catch (Exception e) {
            JDBCUtil.close(conn, null, null);
            return new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
        }

        if (sql.startsWith("CREATE VIEW") || sql.startsWith("create view") || sql.toLowerCase().startsWith("create table")) {
            try {
                st.execute(sql);
            } catch (Exception e) {
                return new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
            } finally {
                JDBCUtil.close(conn, st, null);
            }
            return new TaskRunnerResult(0, null);
        }

        String output = "";
        int status = -1;
        try {
            ResultSet rs = st.executeQuery(sql);
            while (rs.next()) {
                output = rs.getString(1);
                break;
            }
            if (sql.contains("sys_func_pivot_table_view_create")) {
                if ("成功".equals(output)) {
                    return new TaskRunnerResult(0, null);
                } else {
                    status = 500;
                }
            } else {
                JSONObject jsonObject = JSONObject.parseObject(output);
                status = jsonObject.getInteger("status");
            }
        } catch (Exception e) {
            return new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
        } finally {
            JDBCUtil.close(conn, st, null);
        }

        return new TaskRunnerResult(status, output);
    }

}
